package com.xiaomaoguai.fcp.pre.kepler.router.async;

import com.lmax.disruptor.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 消息消费者
 * 继承Thread
 * run方法监听concurrentQueue消息 和ringbuffer消息并执行
 *
 * @param <O>
 * @author DH
 */
public class Processor<O> extends Thread {

	private static Logger log = LoggerFactory.getLogger(Processor.class);

	/**
	 * 消息监听开关
	 */
	private final AtomicBoolean running = new AtomicBoolean(false);

	private final SequenceBarrier sequenceBarrier;

	private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
	/**
	 * 线程池线程共用Sequence
	 */
	private final Sequence workSequence;

	/**
	 * 线程池创建的一级缓存队列ringbuffer，消费者从它里面获取消息
	 */
	private final RingBuffer<HandlerEvent<O>> ringBuffer;

	/**
	 * 线程池创建的二级缓存队列ConcurrentLinkedQueue，消费者从它里面获取被阻塞的消息
	 */
	private final ConcurrentLinkedQueue<HandlerEvent<O>> concurrentQueue;

	/**
	 * 线程池指定的实际处理者
	 */
	private WorkHandler<HandlerEvent<O>> workHandler;

	private AtomicInteger concurrentQueueSize;

	public Processor(final RingBuffer<HandlerEvent<O>> ringBuffer,
					 final ConcurrentLinkedQueue<HandlerEvent<O>> concurrentQueue, final SequenceBarrier sequenceBarrier,
					 final WorkHandler<HandlerEvent<O>> workHandler, final AtomicInteger concurrentQueueSize, final Sequence workSequence) {
		this.ringBuffer = ringBuffer;
		this.concurrentQueue = concurrentQueue;
		this.concurrentQueueSize = concurrentQueueSize;
		this.sequenceBarrier = sequenceBarrier;
		this.workHandler = workHandler;
		this.workSequence = workSequence;
	}

	/**
	 * 关闭当前线程消息监听
	 * sequenceBarrier关闭消息阻塞，抛出alert异常，停止获取ringbuffer消息
	 * 等待当前线程消费完所有可消费的消息
	 */
	public void halt() {
		running.set(false);
		sequenceBarrier.alert();
		try {
			interrupt();
			join();
		} catch (InterruptedException e) {
		}
	}

	public boolean isRunning() {
		return running.get();
	}

	@Override
	public void run() {
		if (!running.compareAndSet(false, true)) {
			throw new IllegalStateException("Processor is already running");
		}
		handleEvent();
	}

	public void handleEvent() {
		// 清除提醒，alert为false
		// 当为true时，sequenceBarrier.waitFor
		// 中checkAlert会抛出AlertException，消费者将停止监听消费
		sequenceBarrier.clearAlert();
		long cachedAvailableSequence = Long.MIN_VALUE;
		long nextSequence = -1L;
		boolean processedSequence = true;
		while (true) {
			try {
				if (processedSequence) {//调整序号，只有首次或者有可消费的消息才执行
					processedSequence = false;
					do {
						nextSequence = workSequence.get() + 1L;
						sequence.set(nextSequence - 1L);
					} while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
				}
				if (cachedAvailableSequence >= nextSequence) {//有可以消费的消息
					HandlerEvent<O> event = ringBuffer.get(nextSequence);
					workHandler.onEvent(event);
					processedSequence = true;
				} else {//ringbuffer无可消费的消息时，执行完concurrentQueue缓存的消息后阻塞
					if (!concurrentQueueConsume())
						continue;
					cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
				}
			} catch (final AlertException ex) {
				if (!running.get()) {//关闭监听退出
					break;
				}
			} catch (Throwable ex) {
				log.error("ringbuffer event handle error ", ex);
				processedSequence = true;
			}
		}
		running.set(false);
	}


	private boolean concurrentQueueConsume() {
		try {
			HandlerEvent<O> event = concurrentQueue.poll();
			if (event == null) {
				return true;
			}
			concurrentQueueSize.decrementAndGet();
			workHandler.onEvent(event);
		} catch (Throwable ex) {
			log.error(" blockQueue event handle error ", ex);
		}
		return false;
	}

	public Sequence getSequence()
	{
		return sequence;
	}

}
